1
//--------------------------------------------------------------------------
3 // Copyright (c) Microsoft Corporation. All rights reserved.
5 // File: LimitedConcurrencyTaskScheduler.cs
7 //--------------------------------------------------------------------------
9 using System
.Collections
.Generic
;
12 namespace System
.Threading
.Tasks
.Schedulers
15 /// Provides a task scheduler that ensures a maximum concurrency level while
16 /// running on top of the ThreadPool.
18 public class LimitedConcurrencyLevelTaskScheduler
: TaskScheduler
20 /// <summary>Whether the current thread is processing work items.</summary>
22 private static bool _currentThreadIsProcessingItems
;
23 /// <summary>The list of tasks to be executed.</summary>
24 private readonly LinkedList
<Task
> _tasks
= new LinkedList
<Task
>(); // protected by lock(_tasks)
25 /// <summary>The maximum concurrency level allowed by this scheduler.</summary>
26 private readonly int _maxDegreeOfParallelism
;
27 /// <summary>Whether the scheduler is currently processing work items.</summary>
28 private int _delegatesQueuedOrRunning
= 0; // protected by lock(_tasks)
31 /// Initializes an instance of the LimitedConcurrencyLevelTaskScheduler class with the
32 /// specified degree of parallelism.
34 /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism provided by this scheduler.</param>
35 public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism
)
37 if (maxDegreeOfParallelism
< 1) throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
38 _maxDegreeOfParallelism
= maxDegreeOfParallelism
;
41 /// <summary>Queues a task to the scheduler.</summary>
42 /// <param name="task">The task to be queued.</param>
43 protected sealed override void QueueTask(Task task
)
45 // Add the task to the list of tasks to be processed. If there aren't enough
46 // delegates currently queued or running to process tasks, schedule another.
50 if (_delegatesQueuedOrRunning
< _maxDegreeOfParallelism
)
52 ++_delegatesQueuedOrRunning
;
53 NotifyThreadPoolOfPendingWork();
59 /// Informs the ThreadPool that there's work to be executed for this scheduler.
61 private void NotifyThreadPoolOfPendingWork()
63 ThreadPool
.UnsafeQueueUserWorkItem(_
=>
65 // Note that the current thread is now processing work items.
66 // This is necessary to enable inlining of tasks into this thread.
67 _currentThreadIsProcessingItems
= true;
70 // Process all available items in the queue.
76 // When there are no more items to be processed,
77 // note that we're done processing, and get out.
78 if (_tasks
.Count
== 0)
80 --_delegatesQueuedOrRunning
;
84 // Get the next item from the queue
85 item
= _tasks
.First
.Value
;
89 // Execute the task we pulled out of the queue
90 base.TryExecuteTask(item
);
93 // We're done processing items on the current thread
94 finally { _currentThreadIsProcessingItems = false; }
98 /// <summary>Attempts to execute the specified task on the current thread.</summary>
99 /// <param name="task">The task to be executed.</param>
100 /// <param name="taskWasPreviouslyQueued"></param>
101 /// <returns>Whether the task could be executed on the current thread.</returns>
102 protected sealed override bool TryExecuteTaskInline(Task task
, bool taskWasPreviouslyQueued
)
104 // If this thread isn't already processing a task, we don't support inlining
105 if (!_currentThreadIsProcessingItems
) return false;
107 // If the task was previously queued, remove it from the queue
108 if (taskWasPreviouslyQueued
) TryDequeue(task
);
110 // Try to run the task.
111 return base.TryExecuteTask(task
);
114 /// <summary>Attempts to remove a previously scheduled task from the scheduler.</summary>
115 /// <param name="task">The task to be removed.</param>
116 /// <returns>Whether the task could be found and removed.</returns>
117 protected sealed override bool TryDequeue(Task task
)
119 lock (_tasks
) return _tasks
.Remove(task
);
122 /// <summary>Gets the maximum concurrency level supported by this scheduler.</summary>
123 public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; }
}
125 /// <summary>Gets an enumerable of the tasks currently scheduled on this scheduler.</summary>
126 /// <returns>An enumerable of the tasks currently scheduled.</returns>
127 protected sealed override IEnumerable
<Task
> GetScheduledTasks()
129 bool lockTaken
= false;
132 Monitor
.TryEnter(_tasks
, ref lockTaken
);
133 if (lockTaken
) return _tasks
.ToArray();
134 else throw new NotSupportedException();
138 if (lockTaken
) Monitor
.Exit(_tasks
);